package org.eocencle.magnet.spark2.component;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.eocencle.magnet.core.component.*;
import org.eocencle.magnet.core.context.ComponentFactory;

import java.util.ArrayList;
import java.util.List;

/**
 * Spark重分区作业节点类
 * @author: huan
 * @Date: 2020-08-15
 * @Description:
 */
public class SparkRepartitionWorkStage extends RepartitionWorkStage {
    @Override
    public void initHandler(WorkStageHandler handler) {

    }

    @Override
    public List<WorkStageResult> execute(WorkStageParameter parameter) {
        ComponentFactory factory = WorkStageComponentBuilderAssistant.getFactory();

        // 获取引用结果
        SparkWorkStageResult ref =
                (SparkWorkStageResult) this.getParent().getPrevResult(this.repartitionInfo.getRef());

        Dataset<Row> ds = ref.getDs().repartition(this.repartitionInfo.getNum());

        JavaRDD<Row> rdd = ds.toJavaRDD();

        // 设置返回值
        SparkWorkStageResult result = (SparkWorkStageResult) factory.createWorkStageResult();
        result.setId(this.repartitionInfo.getId());
        result.setAlias(this.repartitionInfo.getAlias());
        result.setRdd(rdd);
        result.setDs(ds);
        List<WorkStageResult> list = new ArrayList<>();
        list.add(result);
        return list;
    }
}
